Kafka

Jay creps + LinkedIn

Kafka architecture

Back to Index

A typical Kafka cluster can consists of multiple brokers.

  • They help in load balancing message reads and writes to the cluster. Each of those brokers are stateless.

    However, they use Zookeeper for maintaining their states.

  • Each topic partition has one of the brokers as leader and zero or more brokers as followers. The leader manages any read or write requests for their respective partitions. Followers replicate the leader in background without actively interfering with leader working.

    You should think followers as a backup for leader and one of those followers would be chosen as leader in case of leader failure.

Producers push data to brokers.

At the time of publishing data, Producers search for elected leader (broker) of respective topic partition and automatically sends a message to that leader broker server.

Consumers reads messages from brokers.

Consumer maintains its state with the help of Zookeepers since Kafka brokers are stateless.

  • This design helps in scaling Kafka well. Consumer offset value is maintained by Zookeeper.
  • The consumer maintains how many messages have been consumed by it using partition offset. It ultimately acknowledges that message offset to Zookeeper. It means that the consumer has consumed all prior messages.

Database analogy

Database tables are the topics in Kafka,
Applications who are inserting data into tables are Producers and
Applications who are reading data are Consumers.

Zookeeper

Back to Index

Kafka cannot work without Zookeeper. Kafka uses zookeeper for the following:

  • Choosing a Controller: The controller is one of the brokers with responsibilities of partition managements with respect to leader election, topic creation, partition creation and replica managements. When a node or server shuts down, Kafka controllers elect partition leaders from followers. Kafka uses Zookeeper's metadata information to elect a controller. Zookeeper ensures that a new controller is elected in case of current controller crashes.
  • Brokers Metadata: Zookeeper maintains the state of each of the brokers that are part of the Kafka cluster. It maintains all relevant metadata about each of the broker in a cluster. Producer/Consumer do interact with Zookeepers for getting the brokers state.
  • Topic Metadata: Zookeepers also maintain topic metadata like number of partitions, specific configuration parameters and so on.
  • Client Quota Information: With newer versions of Kafka, Quota features are introduced. Quotas enforce byte-rate thresholds on clients for reading and writing messages to Kafka topic. All these information and states are maintained by Zookeeper.
  • Kafka Topic ACLs: Kafka has an in-built authorization module which is defined as Access Control Lists (ACLs ). These ACLs determine user roles and what kind of read and write permissions each of those roles have on respective topics. Kafka uses zookeeper to store all ACLs.

In [ ]:

Scala code

Back to Index

**Main** takes arguments and starts producer application

object KafkaBroker extends App {


  case class Coordinates(lat: Double, lon: Double)

  override def main(args: Array[String]): Unit = {

    // parameters
    val topic = args(0) // plume_pollution
    val brokers = args(1) // localhost:9092 - "broker1:port,broker2:port"
    val lat = args(2).toDouble // latitude - test value: 48.85
    val lon = args(3).toDouble // longitude - test value: 2.294
    val sleepTime = args(4).toInt // 1000 - time between queries to API

    // user 'lat' and 'lon' to create Coordinates object
    val location = Coordinates(lat, lon)

    startIngestion(brokers, topic, location, sleepTime)


  } // end of main

startBroker creates a new Kafka Producer with specified properties

Below are 3 mandatory configuration parameter:

  1. bootstrap.servers: This contains list of Kafka brokers address. Address is specified in terms of hostname:port. We can specify one or more broker detail, but we recommend to provide atleast 2 so if one broker goes down Producer can use other one.

  2. key.serializer : The massage is sent to Kafka brokers in the form of key value pair. Brokers expect this kay value to be in byte arrays. So we need to tell producer which serializer class to be used to convert this key value object to byte array. This property is set to tell producer that which class to use to serialize key of message. Kafka provide us 3 inbuilt serializer class ByteArraySerializer , StringSerializer and IntegerSerializer . All this classes are present under org.apache.kafka.common.serialization package and implements Serializer interface.

  3. value.serializer : Similar to key.serializer property but this property tells producer , which class to use to serialize value . You can implement your own serialize class and assign to this property.

/**
    * Helper function to create a KafkaProducer using brokers ip and port
    *
    * @param brokers Broker information in the format 'localhost:9092'
    *                or "broker1:port,broker2:port"
    *
    * @return KafkaProducer[String, String]
    */

  def startBroker(brokers:String): KafkaProducer[String, String] = {

    // Kafka Broker properties
    val props = new Properties()
    props.put("bootstrap.servers", brokers)
    props.put("client.id", "ScalaKafkaProducer")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("acks", "all")
    props.put("retries", new Integer(1))
    //    props.put("batch.size", new Integer(16384))
    //    props.put("linger.ms", new Integer(1))
    //    props.put("buffer.memory", new Integer(133554432))

    // TODO: implement ProducerCallback()

    new KafkaProducer[String, String](props)

  } // end of startBroker

**startIngestion** has the following objectives:

  • Load [Plume.io] token
    • initialize KafkaProducer
    • create a ProducerRecord by querying API with specified interval
      • ProducerRecord contains a topic name, partition number, Timestamp, key and value.
        • Optional: Partition number, Timestamp and key
        • Mandatory: topic to which data will be sent and value which contains data are mandatory.
/**
    * Queries plume pollution API for a particular 'location' (lat, long) in an interval defined by 'sleepTime'
    * and creates a KafkaProducer to ingest content
    *
    * @param brokers Broker information in the format 'localhost:9092'
    *                or "broker1:port,broker2:port"
    * @param topic Topic to publish message to
    * @param location Latitude and Longitude to query pollution
    * @param sleepTime Time interval between queries to plume API
    *
    */


  def startIngestion(brokers:String, topic:String, location: Coordinates, sleepTime: Int) = {

    // access plume token https://github.com/zipfian/cartesianproduct2/wiki/TOKEN
    lazy val token:Option[String] = sys.env.get("PLUMETOKEN") orElse {
      println("No token found. Check how to set it up at https://github.com/zipfian/cartesianproduct2/wiki/TOKEN")
      None
    }

    while (true){

      // create producer with 'props' properties
      val producer = startBroker(brokers)

      // query web API - response will be a String
      val response = Source.fromURL(
        "https://api.plume.io/1.0/pollution/forecast?token="+ token.get +"&lat="+ location.lat +"&lon="+ location.lon
      ).mkString

      val producerRecord = new ProducerRecord[String, String](topic, response)
      val recordMetadata = producer.send(producerRecord)

      val meta = recordMetadata.get() // I could use this to write some tests
      val msgLog =
        s"""
           |topic     = ${meta.topic()}
           |offset    = ${meta.offset()}
           |partition = ${meta.partition()}
          """.stripMargin
      println(msgLog)

      producer.close()

      // pause in between queries - this should be an argument
      Thread.sleep(sleepTime)

    } // end of infinity loop


  } // end of startIngestion

} // end of KafkaBroker object

Producer Callback

Kafka provides us Callback interface which helps in dealing with message reply irrespective of error or successful.

  • send(ProducerRecord, new Callback())
class ProducerCallback extends Callback {

    override def onCompletion(recordMetadata: RecordMedata, ex: Exception): = {
        if (ex) {
         // handle experienced exceptions   
        }
        else {
            // what was done in startingestion
            val meta = recordMetadata.get() 
            val msgLog =
        s"""
           |topic     = ${meta.topic()}
           |offset    = ${meta.offset()}
           |partition = ${meta.partition()}
          """.stripMargin
            println(msgLog)

        } // end of else
    } // end of onCompletion
} // end of ProducerCallback class

In [ ]:

Kafka Bash commands
Links to documentation (Kafka tutorial)
Link to Scala documentation
Link to Resources


In [ ]: